
package com.niudong.esdemo.pscsoft;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.Map;

public class QueryByScriptedMetricAggregation {
    public static void main(String[] args) {
        initEs();
        SearchRequest myrow = buildSearchRequest("myrow");

        executeSearchRequest(myrow);
        closeEs();
    }

    private static RestHighLevelClient restClient;
    private static Log log = LogFactory.getLog(QueryByScriptedMetricAggregation.class);

    public static void initEs() {
        restClient = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")));
        log.info("ElasticSearch init in service.");
    }

    public static void closeEs() {
        try {
            restClient.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static SearchRequest buildSearchRequest(String indices) {
        SearchRequest searchRequest = new SearchRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        searchRequest = new SearchRequest(indices);
        searchRequest.routing("routing");
        searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
        searchRequest.preference("_local");
        ScriptedMetricAggregationBuilder aggregation = AggregationBuilders
                .scriptedMetric("agg")
                .initScript(new Script("state.datas = []"))
                .mapScript(new Script("state.datas.add(doc._row.value)"))
                .combineScript(new Script("Map map =new HashMap(); for (key in state.datas) {if(map.containsKey(key)){map.put(key,map.get(key)+1)}else{map.put(key,1)}} return map"))
                .reduceScript(new Script("Map map =new HashMap(); for (m in states){ Set keys = m.keySet();for(key in keys){if(map.containsKey(key)){map.put(key,m.get(key)+map.get(key))}else{map.put(key,m.get(key))}}} return map"));
        searchSourceBuilder.aggregation(aggregation);
        searchRequest.source(searchSourceBuilder);
        return searchRequest;
    }

    // 同步方式执行SearchRequest
    public static void executeSearchRequest(SearchRequest searchRequest) {
        try {
            SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
            log.info(searchResponse.toString());
            processSearchResponse(searchResponse);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeEs();
        }
    }

    // 解析SearchResponse
    private static void processSearchResponse(SearchResponse searchResponse) {
        if (searchResponse == null) {
            return;
        }
        RestStatus status = searchResponse.status();
        TimeValue took = searchResponse.getTook();
        Boolean terminatedEarly = searchResponse.isTerminatedEarly();
        boolean timedOut = searchResponse.isTimedOut();
        System.out.println("status is " + status + ";took is " + took + ";terminatedEarly is " + terminatedEarly
                + ";timedOut is " + timedOut);
        int totalShards = searchResponse.getTotalShards();
        int successfulShards = searchResponse.getSuccessfulShards();
        int failedShards = searchResponse.getFailedShards();
        System.out.println("totalShards is " + totalShards + ";successfulShards is " + successfulShards
                + ";failedShards is " + failedShards);

        for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
            System.out.println("fail is " + failure.toString());
        }
        SearchHits hits = searchResponse.getHits();
        TotalHits totalHits = hits.getTotalHits();
        long numHits = totalHits.value;
        float maxScore = hits.getMaxScore();
        System.out.println("numHits is " + numHits + ";maxScore is " + maxScore);
        SearchHit[] searchHits = hits.getHits();
        for (SearchHit hit : searchHits) {
            // SearchHit提供对基本信息的访问，如索引、文档ID和每次搜索的得分：
            String index = hit.getIndex();
            String id = hit.getId();
            float score = hit.getScore();
            System.out.println("docId is " + id + ";docIndex is " + index + ";docScore is " + score);

            String sourceAsString = hit.getSourceAsString();
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
            String row = (String) sourceAsMap.get("_row");
            System.out.println(row);
            Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
            System.out.println("sourceAsString is " + sourceAsString + ";innerObject" + innerObject);
        }

        Aggregations aggregations = searchResponse.getAggregations();
        if (aggregations == null) {
            return;
        }
        ScriptedMetric agg = searchResponse.getAggregations().get("agg");
        Object scriptedResult = agg.aggregation();
        System.out.println("scriptedResult :" + scriptedResult);

    }
}
